Author: Fernando Felix do Nascimento Junior
Last update: 24/06/2019
This notebook is divided in the following topics:
Summary:
- Load libs and modules
- Load raw data sets
- Split actual and addressable customers
- EDA
- Descriptive statistics analsysis
- Customer type size analysis
- Outlier analysis
- Univariate distribution analysis
- Model building
- Customer segmentation
- Decision-tree classifier
- Customer scorer (pearson similiarity)
- Generate Deliverables
- Deliverable 1
- Deliverable 2
- Sanity Check
- Improvements (TODO)
- Cluster based classifier and scorer (deprecated)
It runs on top of IBM Watson Studio with the following hardware and software config:
Link (Private):
Run the requirements in the terminal:
pip install requirements.txt
from config import *
from utils import *
from viz import *
from clustering import *
from classifier import *
Let's load the raw datasets from customer CRM and Neoway firmographic.
# customer_crm = load_dataframe_from_url('https://gist.githubusercontent.com/fernandojunior/e30bbab8298fb8a57c78f52079503fd8/raw/0f8995acab136324237b17a670fa083db674462d/customer_CRM_2019-05-17.csv')
# neoway_db = load_dataframe_from_url('https://gist.githubusercontent.com/fernandojunior/e30bbab8298fb8a57c78f52079503fd8/raw/0f8995acab136324237b17a670fa083db674462d/Neoway_database_2019-05-17.csv')
customer_crm = load_csv_as_dataframe('customer_CRM_2019-05-17.csv')
neoway_db = load_csv_as_dataframe('Neoway_database_2019-05-17.csv')
Inspect customer_crm dataset (dimension, schema, sample):
customer_crm = inspect_dataframe(customer_crm, n=3)
Inspect customer_crm dataset (dimension, schema, sample):
neoway_db = inspect_dataframe(neoway_db, n=3)
Notes:
In this section, we will carry out the following activities:
# merge data and create new column to identify actual or addresable customer
customer_crm = customer_crm.withColumn('addressable', F.lit(False))
all_customers = neoway_db.join(customer_crm, ['id'], 'left_outer')
all_customers = all_customers.withColumn('addressable', F.coalesce(F.col('addressable'), F.lit(True)))
all_customers = all_customers.distinct() # removendo dados duplicados
all_customers.groupBy('addressable').agg(F.count('*')).show()
Notes:
Now, let's split the all_customers dataset into actual_customers and addressable_customers datasets.
# split all customers into actual_customers and addressable_customers
actual_customers = all_customers.filter(F.col('addressable') == False)
addressable_customers = all_customers.filter(F.col('addressable') == True)
# checkpoints to refresh spark DAG
all_customers = checkpoint(all_customers, 'all_customers.parquet')
actual_customers = checkpoint(actual_customers, 'actual_customers.parquet')
addressable_customers = checkpoint(addressable_customers, 'addressable_customers.parquet')
Before performing EDA, let's identify the features and customer types:
feature_cols = ['feat_0', 'feat_1', 'feat_2', 'feat_3', 'feat_4', 'feat_5', 'feat_6', 'feat_7', 'feat_8', 'feat_9']
customer_type_list = [i.type for i in all_customers.select('type').distinct().filter(F.col('type').isNotNull()).collect()]
print('Feature columns: ', feature_cols)
print('Customer types:', customer_type_list)
Let's summarize the descriptive statistics for each dataset: actual_customers, addressable_customers.
print('Descriptive statistics for actual customers dataset:')
actual_customers = describe(actual_customers, feature_cols)
print('Descriptive statistics for addressable customers dataset:')
addressable_customers = describe(addressable_customers, feature_cols)
Notes:
Let's analyse the number of customers by type for each dataset: actual customers, addressable customers, all customers.
grouped_bar_plot([actual_customers, addressable_customers, all_customers], ['actual customers', 'addressable customers', 'all customers'], 'type')
Notes:
Let's perform a simple outlier analysis by applying boxplot for each customer type on merged dataset (actual dataset + addressable dataset).
for customer_type in customer_type_list:
print('Boxplot for ', customer_type)
boxplots(all_customers.filter(F.col('type') == customer_type), feature_cols)
dist_plots(all_customers.na.drop(), feature_cols, show_hist=False)
Notes:
The main goal of k-means is to decrease the data variability by grouping similar items. To find the optimal number of k, we will use a custom elbow method, which aims to analyze the proportion of how much the WSSE decreases over the TSSE for each k.
k_list, k_scores = elbow_curve(actual_customers, feature_cols, max_k=70, seed=SEED)
elbow_curve_plot(k_list, k_scores, variability_reduction_rate=True)
Notes:
The following figure summarizes the size of the actual customer in each cluster.
actual_customers = actual_customers.drop('cluster')
cluster_results = cluster_data(actual_customers, feature_cols, 11, seed=SEED)
actual_customers = cluster_results['predictions'].withColumnRenamed('prediction', 'cluster')
#boxplots(actual_customers.groupBy('segment').agg(F.count('*').alias('count')), ['count'])
grouped_bar_plot([actual_customers], ['actual customers'], 'cluster')
Note:
Based on clustered data, we will train a decision tree classisifier:
label_col = 'cluster'
prediction_label = 'prediction'
(training, testing) = actual_customers.randomSplit([0.7, 0.3], seed=SEED)
training = checkpoint(training, 'training.parquet')
testing = checkpoint(testing, 'testing.parquet')
training = resample(training, label_col, seed=SEED)
training = checkpoint(training, 'training_resampled.parquet')
grouped_bar_plot([training], ['training'], label_col)
param_grid = {
'maxDepth': [5, 8, 13],
'maxBins': [13, 21, 34],
'impurity': ['entropy', 'gini'],
'seed': [SEED]
}
(model, testing_score, training_score, testing, training, best_param_map) = train_multiclass_classifier(
DecisionTreeClassifier,
training,
testing,
feature_cols,
label_col,
param_grid,
metric='accuracy')
print('testing_score vs training_score:', testing_score, training_score)
Notes:
In this section we will perform the following tasks:
@F.udf("double")
def udf_corr_scoring(point, cluster):
point = pd.Series([float(i) for i in point])
corr_series = centers.corrwith(point)
result = corr_series[int(cluster)]
return float(result)
# predict addressable_customers' clusters and estimate correlation core
addressable_customers = addressable_customers.drop('features', 'prediction', 'rawPrediction', 'probability')
addressable_customers = model.transform(assemble_vector(addressable_customers, feature_cols, 'features'))
addressable_customers = addressable_customers.withColumn('score', udf_corr_scoring(F.array(feature_cols), prediction_label))
addressable_customers.show(5)
def save_deliberable(df, filename):
filename = cos.url(filename, 'potentialmarketranking-donotdelete-pr-cej2kccafd4zxc')
df.repartition(1).write.mode('overwrite').option("header", "true").csv(filename)
def load_deliberable(filename):
filename = cos.url(filename, 'potentialmarketranking-donotdelete-pr-cej2kccafd4zxc')
return spark.read\
.format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
.option('header', 'true')\
.load(filename)
Finally, let's generate the challange deliverables.
save_deliberable(addressable_customers.select('id'), 'addressable_ids.csv')
save_deliberable(training.select('id'), 'training_ids.csv')
save_deliberable(testing.select('id'), 'testing_ids.csv')
save_deliberable(addressable_customers.select('id', 'score').orderBy(F.desc('score')), 'addressable_ranking.csv')
print('OK')
Let's check if the deliverable datasets were generated and saved correctly.
print('training_ids.csv')
load_deliberable('training_ids.csv').show(5)
print('testing_ids.csv')
load_deliberable('testing_ids.csv').show()
print('addressable_ranking.csv')
load_deliberable('addressable_ranking.csv').show()
TODO:
- Redistribute customers from small clusters into other clusters
- persona characterization (improve explicability) - analyze centroids to describe and differentiate each cluster behavior
- create files to store python modules: utils, config, clustering, etc.
- save models (clustering, classsifier)
- use spark pipelines
- compute processing time for clustering and predictions
- try to use euclidian distance to compute score
Clustering is not able to classify instances of companies in itself. Instead, a simple classification model was built on top of the result of the clustering (reference). This was conducted with the following algorithm
centers = pd.DataFrame(cluster_results['centers']).T
def assemble_vector(dataframe, input_cols, output_col):
'''
Combine a given list of columns into a single vector column in a a pyspark dataframe
https://spark.apache.org/docs/latest/ml-features.html#vectorassembler
'''
assembler = VectorAssembler(inputCols=input_cols, outputCol=output_col)
return assembler.transform(dataframe.na.drop())
def find_cluster(point):
'''
Find the most correlated cluster of a given point
1. Compute the pairwise correlation between each row of a clusters' centers (pd.Dataframe) vs a dimensional point (pd.Series).
2. Rank the correlations
3. Return a tuple containing: the most correlated cluster of point, correlation estimation.
Ref:
- https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.corrwith.html
- https://stackoverflow.com/a/38468711/4159153
- https://stackoverflow.com/a/35459076/4159153
TODO: Use euclidian distance to compute score
```
from scipy.spatial import distance
a = (1, 2, 3)
b = (4, 5, 6)
distance.euclidean(a, b)
```
'''
point = pd.Series([float(i) for i in point])
corr_series = centers.corrwith(point)
corr_series = corr_series.sort_values(ascending=False)
result = list(zip(corr_series.index, corr_series))[0] # first result from ranking
return (float(result[0]), float(result[1])) # (cluster, correlation)
@F.udf("double")
def udf_estimate_cluster(point):
return find_cluster(point)[0]
@F.udf("double")
def udf_estimate_score(point):
return find_cluster(point)[1]
def predict_cluster(df):
df = assemble_vector(df, feature_cols, 'features')
df = df.withColumn('prediction', udf_estimate_cluster(F.col('features')))
df = df.withColumn('score', udf_estimate_score(F.col('features')))
df = df.drop('features')
return df
training = checkpoint(actual_customers.distinct(), 'training.parquet')
testing = checkpoint(addressable_customers.distinct(), 'testing.parquet')
training = predict_cluster(training)
testing = predict_cluster(testing)
print('Actual customers predictions based on cluster centroids correlation')
training.show(3)
print('Addressable customers predictions based on cluster centroids correlation')
testing.show(3)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
def evaluate_multiclass_model(predictions_data, label_col, prediction_col='prediction', metric_name='accuracy'):
evaluator = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol=prediction_col, metricName=metric_name)
score = evaluator.evaluate(predictions_data)
return score
eval_metrics = ['accuracy', 'weightedPrecision', 'weightedRecall']
label_col = 'cluster'
prediction_col = 'prediction'
for metric in eval_metrics:
global_train_score = evaluate_multiclass_model(training, label_col, prediction_col)
print('{metric} for training : {global_train_score}'.format(metric=metric,global_train_score=global_train_score))
Sanity check
training.groupBy('cluster').agg({x: "avg" for x in feature_cols}).show()
training.groupBy('prediction').agg({x: "avg" for x in feature_cols}).show()
testing.groupBy('prediction').agg({x: "avg" for x in feature_cols}).show()